// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//

#pragma once

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string_view>
#include <vector>

#include <nebula/core/buffer.h>
#include <nebula/types/type_fwd.h>
#include <nebula/csv/options.h>
#include <nebula/csv/type_fwd.h>
#include <turbo/utility/status.h>
#include <turbo/base/macros.h>


namespace nebula::csv {

    /// Skip at most num_rows from the given input.  The input pointer is updated
    /// and the number of actually skipped rows is returns (may be less than
    /// requested if the input is too short).
    TURBO_EXPORT
    int32_t SkipRows(const uint8_t *data, uint32_t size, int32_t num_rows,
                     const uint8_t **out_data);

    class BlockParserImpl;

    namespace detail {

        struct ParsedValueDesc {
            uint32_t offset: 31;
            bool quoted: 1;
        };

        class TURBO_EXPORT DataBatch {
        public:
            explicit DataBatch(int32_t num_cols) : num_cols_(num_cols) {}

            /// \brief Return the number of parsed rows (not skipped)
           [[nodiscard]] int32_t num_rows() const { return num_rows_; }

            /// \brief Return the number of parsed columns
            [[nodiscard]] int32_t num_cols() const { return num_cols_; }

            /// \brief Return the total size in bytes of parsed data
            [[nodiscard]] uint32_t num_bytes() const { return parsed_size_; }

            /// \brief Return the number of skipped rows
            [[nodiscard]] int32_t num_skipped_rows() const { return static_cast<int32_t>(skipped_rows_.size()); }

            template<typename Visitor>
            turbo::Status visit_column(int32_t col_index, int64_t first_row, Visitor &&visit) const {
                using detail::ParsedValueDesc;

                int32_t batch_row = 0;
                for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) {
                    const auto &values_buffer = values_buffers_[buf_index];
                    const auto values = reinterpret_cast<const ParsedValueDesc *>(values_buffer->data());
                    const auto max_pos =
                            static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) - 1;
                    for (int32_t pos = col_index; pos < max_pos; pos += num_cols_, ++batch_row) {
                        auto start = values[pos].offset;
                        auto stop = values[pos + 1].offset;
                        auto quoted = values[pos + 1].quoted;
                        turbo::Status status = visit(parsed_ + start, stop - start, quoted);
                        if (TURBO_UNLIKELY(!status.ok())) {
                            return decorate_with_row_number(std::move(status), first_row, batch_row);
                        }
                    }
                }
                return turbo::OkStatus();
            }

            template<typename Visitor>
            turbo::Status visit_last_row(Visitor &&visit) const {
                using detail::ParsedValueDesc;

                const auto &values_buffer = values_buffers_.back();
                const auto values = reinterpret_cast<const ParsedValueDesc *>(values_buffer->data());
                const auto start_pos =
                        static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) -
                        num_cols_ - 1;
                for (int32_t col_index = 0; col_index < num_cols_; ++col_index) {
                    auto start = values[start_pos + col_index].offset;
                    auto stop = values[start_pos + col_index + 1].offset;
                    auto quoted = values[start_pos + col_index + 1].quoted;
                    TURBO_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted));
                }
                return turbo::OkStatus();
            }

        protected:
            turbo::Status decorate_with_row_number(turbo::Status &&status, int64_t first_row,
                                                int32_t batch_row) const {
                if (first_row >= 0) {
                    // `skipped_rows_` is in ascending order by construction, so use bisection
                    // to find out how many rows were skipped before `batch_row`.
                    const auto skips_before =
                            std::upper_bound(skipped_rows_.begin(), skipped_rows_.end(), batch_row) -
                            skipped_rows_.begin();
                    status = status.with_message("Row #", batch_row + skips_before + first_row, ": ",
                                                 status.message());
                }
                // Use return_if so that when extra context is enabled it will be added
                TURBO_RETURN_IF_(true, std::move(status), TURBO_STRINGIFY(status));
                return std::move(status);
            }

            // The number of rows in this batch (not including any skipped ones)
            int32_t num_rows_ = 0;
            // The number of columns
            int32_t num_cols_ = 0;

            // XXX should we ensure the parsed buffer is padded with 8 or 16 excess zero bytes?
            // It may help with null parsing...
            std::vector<std::shared_ptr<Buffer>> values_buffers_;
            std::shared_ptr<Buffer> parsed_buffer_;
            const uint8_t *parsed_ = nullptr;
            int32_t parsed_size_ = 0;

            // Record the current num_rows_ each time a row is skipped
            std::vector<int32_t> skipped_rows_;

            friend class ::nebula::csv::BlockParserImpl;
        };

    }  // namespace detail

    constexpr int32_t kMaxParserNumRows = 100000;

    /// \class BlockParser
    /// \brief A reusable block-based parser for CSV data
    ///
    /// The parser takes a block of CSV data and delimits rows and fields,
    /// unquoting and unescaping them on the fly.  Parsed data is own by the
    /// parser, so the original buffer can be discarded after parse() returns.
    ///
    /// If the block is truncated (i.e. not all data can be parsed), it is up
    /// to the caller to arrange the next block to start with the trailing data.
    /// Also, if the previous block ends with CR (0x0d) and a new block starts
    /// with LF (0x0a), the parser will consider the leading newline as an empty
    /// line; the caller should therefore strip it.
    class TURBO_EXPORT BlockParser {
    public:
        explicit BlockParser(ParseOptions options, int32_t num_cols = -1,
                             int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);

        explicit BlockParser(MemoryPool *pool, ParseOptions options, int32_t num_cols = -1,
                             int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows);

        ~BlockParser();

        /// \brief Parse a block of data
        ///
        /// Parse a block of CSV data, ingesting up to max_num_rows rows.
        /// The number of bytes actually parsed is returned in out_size.
        turbo::Status parse(std::string_view data, uint32_t *out_size);

        /// \brief Parse sequential blocks of data
        ///
        /// Only the last block is allowed to be truncated.
        turbo::Status parse(const std::vector<std::string_view> &data, uint32_t *out_size);

        /// \brief Parse the final block of data
        ///
        /// Like parse(), but called with the final block in a file.
        /// The last row may lack a trailing line separator.
        turbo::Status parse_final(std::string_view data, uint32_t *out_size);

        /// \brief Parse the final sequential blocks of data
        ///
        /// Only the last block is allowed to be truncated.
        turbo::Status parse_final(const std::vector<std::string_view> &data, uint32_t *out_size);

        /// \brief Return the number of parsed rows
       [[nodiscard]] int32_t num_rows() const { return parsed_batch().num_rows(); }

        /// \brief Return the number of parsed columns
        [[nodiscard]] int32_t num_cols() const { return parsed_batch().num_cols(); }

        /// \brief Return the total size in bytes of parsed data
        [[nodiscard]] uint32_t num_bytes() const { return parsed_batch().num_bytes(); }

        /// \brief Return the total number of rows including rows which were skipped
        [[nodiscard]] int32_t total_num_rows() const {
            return parsed_batch().num_rows() + parsed_batch().num_skipped_rows();
        }

        /// \brief Return the row number of the first row in the block or -1 if unsupported
        [[nodiscard]] int64_t first_row_num() const;

        /// \brief Visit parsed values in a column
        ///
        /// The signature of the visitor is
        /// turbo::Status(const uint8_t* data, uint32_t size, bool quoted)
        template<typename Visitor>
        turbo::Status visit_column(int32_t col_index, Visitor &&visit) const {
            return parsed_batch().visit_column(col_index, first_row_num(),
                                              std::forward<Visitor>(visit));
        }

        template<typename Visitor>
        turbo::Status visit_last_row(Visitor &&visit) const {
            return parsed_batch().visit_last_row(std::forward<Visitor>(visit));
        }

    protected:
        std::unique_ptr<BlockParserImpl> impl_;

        [[nodiscard]] const detail::DataBatch &parsed_batch() const;
    };

}  // namespace nebula::csv
